SI618 Project 1¶

Email: hyfrankl@umich.edu

Task 0: Data Exploration and Preprocess¶

In [ ]:
# !pip install mrjob
# !pip install pandas
# !pip install seaborn
# !pip install nltk
# !pip install datetime
# !pip install wordcloud

# nltk.download('stopwords')
# nltk.download('omw-1.4')
# nltk.download('wordnet')
# nltk.download('punkt')
# nltk.download('averaged_perceptron_tagger')

# restart the kernel
from authorContrib import AuthorContrib, preload, load_data
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import nltk
from nltk.stem import WordNetLemmatizer
from datetime import datetime
import string
import re

ss = SparkSession.builder.appName('si618prj').getOrCreate()
In [ ]:
%%time
# Use SparkSQL to extract data
data = preload(ss, input="data/metadata.csv")
CPU times: user 5.38 ms, sys: 2.89 ms, total: 8.28 ms
Wall time: 45.3 s
In [ ]:
!cp metadata_sub.csv/*.csv subdata.csv

Task 0: Data Manipulation with CORD-19 Dataset¶

Sub-Task 1. Analyze the number of published papers with MRJob

Use MRJob to get the basic information

  • the number of published paper per person in each month
In [ ]:
mr_job = AuthorContrib(args=["subdata.csv", "-o", "output"])
mr_job.make_runner().run()
No configs specified for inline runner
In [ ]:
!cat output/part* > task1.txt

Identify frequent unigrams and bigrams in abstract with PySpark

In [ ]:
author_contribution = ss.read.csv('task1.txt', sep='\t', header=False) \
        .withColumnRenamed('_c0', 'author') \
        .withColumnRenamed('_c1', 'year') \
        .withColumnRenamed('_c2', 'month') \
        .withColumnRenamed('_c3', 'count')

author_contribution.createOrReplaceTempView('auth_paper')

The distribution of the number of published paper per person between 2002 and 2022

In [ ]:
publish_distribution = ss.sql("""
        select author, sum(count) as cnt from auth_paper 
        where year between 2002 and 2022 group by author""").toPandas()
In [ ]:
publish_distribution['cnt'].hist()
plt.xlabel('Number of Published Paper')
plt.ylabel('Number of People')
plt.title('The Distribution of the Number of Published Paper per Person ')
plt.yscale('log')

Who frequently appears in the top 5 authors with the most published paper (at least 5) between 2002 and 2022

  • Definition of Top 5: the top 5 authors with the most published paper (at least 5) between 2002 and 2022
In [ ]:
important_candidates = ss.sql("""
    select author, count(1) as c from (
        select year, author, cnt from (
            select year, author, cnt, row_number() over(partition by year order by cnt desc, author) as nrow from (
                select author, year, sum(count) as cnt from auth_paper where year between 2002 and 2022 group by author, year having cnt >= 5
            )
        ) where nrow <= 5
    ) group by author having c > 2 order by c desc
""")
important_candidates.createOrReplaceTempView('df4')

Sub-Task 2: Frequent Unigrams/Bigrams in the Abstract Along Year 2019 - 2022

Select the monthly frequent unigrams/bigrams (at least 10 times)in the abstract between 2019 and 2022

In [ ]:
metadata = ss.read.option('header', True).csv("data/metadata.csv")
metadata.createOrReplaceTempView('metadata')
abstract_data = ss.sql("""
    select publish_time, abstract from metadata 
    where year(publish_time) between 2019 and 2022 and abstract is not null""")
In [ ]:
st = ['patients', 'studies', 'study', 'cases', 'results', 'analysis', 'p',
      'cases', 'years', 'months', 'days', 'd', 'data', 'time', 'number', 'measures', 'countries']
In [ ]:
def remove_helper(w1, w2, t1, t2):
    """return unigrams or bigrams (noun)"""
    if t1 and t2:
        return [w1, w2]
    elif t1:
        return [w1]
    elif t2:
        return [w2]


def remove_stopwords(sentence, stop):
    try:
        from nltk.corpus import stopwords
        st = stopwords.words('english')
        lemmatizer = WordNetLemmatizer()
        words = [
            lemmatizer.lemmatize(w.lower()) 
            for w in nltk.word_tokenize(sentence) 
            if (w.lower() not in set(stop)) and (w.lower() not in st) and not re.fullmatch('[' + string.punctuation + ']+', w)
        ]
        marks = [t.startswith('NN') for w, t in nltk.pos_tag(words) if len(w) != 0]
        bigrams = [remove_helper(w1, w2, t1, t2) for w1, w2, t1, t2 in zip(words[:-1], words[1:], marks[:-1], marks[1:]) if t1 or t2]
        return bigrams
    except:
        return []
    

def publish_month(x):
    try:
        date = datetime.strptime(x ,"%Y-%m-%d")
        return str(date.year) + "-" + str(date.month)
    except:
        return ""
In [ ]:
q2 = abstract_data.rdd.filter(lambda x: len(publish_month(x[0])) != 0) \
        .map(lambda x: (publish_month(x[0]), x[1])) \
        .flatMapValues(lambda x: nltk.sent_tokenize(x))
q3 = q2.flatMapValues(lambda x: remove_stopwords(x, st))
q4 = q3.map(lambda x: (x[0] + "\t" + " ".join(x[1]), 1)) \
        .reduceByKey(lambda x,y: x+y) \
        .filter(lambda x: x[1] >= 10) \
        .map(lambda x: x[0]+"\t"+str(x[1]))

q4.collect()
q4.saveAsTextFile('prj_task2')
In [ ]:
!cat prj_task2/* >task2.txt

Word Cloud for 2019, 2020, 2021, 2022

In [ ]:
word_freq = pd.read_csv("task2.txt", sep='\t', header=None)
In [ ]:
from wordcloud import WordCloud

for year in ['2019','2020', '2021', '2022']:
    keywords = word_freq[word_freq[0].str.startswith(year)][[1, 2]] \
        .groupby(1).sum().sort_values(2, ascending=False).iloc[:100].to_dict()[2]
    wordcloud = WordCloud()
    wordcloud.generate_from_frequencies(frequencies=keywords)
    plt.figure(figsize=[8, 4], dpi=300)
    plt.imshow(wordcloud, interpolation="bilinear")
    plt.title(f'Word Cloud for {year}')
    plt.axis("off")
    plt.show()

Main Task. The Relationship with Newly Confirmed Cases¶

In [ ]:
# from freqNgrams import load_data
confirmed_cases = load_data('data/confirmed_global.csv')
In [ ]:
sns.set_theme()

Task 1: The Relationship with Published Researchers

In [ ]:
def cnt_mapper(x):
    year = x['year']
    month = x['month']
    author = x['author']
    quarter = (int(month) + 2)//3
    date = year + "Q" + str(quarter)
    return (date, set([author]))

def cnt_reducer(x, y):
    return x | y
In [ ]:
people = author_contribution.rdd.filter(lambda x: x['month'] != None).map(lambda x: cnt_mapper(x))\
            .reduceByKey(lambda x,y: cnt_reducer(x,y)).mapValues(len).collect()
people_df = pd.DataFrame(people, columns=['datetime', 'num'])

confirmed_with_people = confirmed_cases.merge(people_df, on='datetime', how='inner')
confirmed_with_people.drop(index=confirmed_with_people.index[confirmed_with_people['datetime'] > "2022Q2"], axis=0, inplace=True)
In [ ]:
sns.set_style()

fig, axes = plt.subplots(2, 1, figsize=(12, 12), sharex=True)
sns.lineplot(data=confirmed_with_people, x='datetime', y='sum', ax=axes[0])
axes[0].set_ylabel('Number of Newly Confirmed Cases')
# axes[0].set_yscale('log')

sns.lineplot(data=confirmed_with_people, x='datetime', y='num', ax=axes[1])
axes[1].set_ylabel('Number of Published Researchers')
# axes[1].set_yscale('log')
Out[ ]:
Text(0, 0.5, 'Number of Published Researchers')
In [ ]:
np.corrcoef(confirmed_with_people['sum'][:-2], confirmed_with_people['num'][2:])[0, 1]
Out[ ]:
0.09254640944316803
In [ ]:
confirmed_with_people.to_csv('task1.csv')

Task 2. The Relationship with Published Paper

In [ ]:
pop_author = ss.sql("""
    select author, year, floor((month+2)/3) as quarter, sum(count) as cnt 
    from auth_paper where author in (select author from df4) group by author, year, quarter
""")
In [ ]:
candidate_contribution = pd.pivot_table(data=pop_author.toPandas(), 
        index=['year', 'quarter'], columns='author', values='cnt', aggfunc=np.sum, fill_value=0).reset_index()
candidate_contribution['year'] = candidate_contribution['year'].astype(int)

df_join2 = confirmed_cases.merge(candidate_contribution, on=['year', 'quarter'], how='inner')
In [ ]:
fig, axes = plt.subplots(2, 1, figsize=(12, 8), sharex=True)

axes[0].set_title("Newly Confirmed Cases vs. Time")
axes[0].bar(df_join2['datetime'], df_join2['sum'])
axes[0].set_ylabel('Number of Newly Confimed Cases')

axes[1].set_title("Newly Published Paper vs. Time")
for col in df_join2.columns[4:]:
    axes[1].plot(df_join2['datetime'], df_join2[col])
axes[1].set_ylabel('Number of Newly Published Paper')
axes[1].legend(df_join2.columns[4:], bbox_to_anchor=(1.05, 1))
    
# plt.tight_layout()
fig.supxlabel('Time')
plt.show()
In [ ]:
df_join2.to_csv('task2.csv')

Sub-Task 3. The Relationship with Ngrams

In [ ]:
def quarter_freq(df, keys):
    df['year'] = df[0].apply(lambda x: x.split('-')[0])
    df['quarter'] = df[0].apply(lambda x: (int(x.split('-')[1]) + 2) // 3) 
    df1 = df.drop(columns=[0])
    df2 = df1.drop(columns=[1]).groupby(['year', 'quarter'], as_index=False).sum().rename({2:'total'}, axis=1)
    df3 = pd.pivot_table(
        data=df1[df1[1].isin(keys)], 
        index=['year', 'quarter'], 
        columns=1, values=2, 
        aggfunc=np.sum).reset_index()
    
    df2['datetime'] = df2['year'].apply(str) + "Q" + df2['quarter'].apply(str)
    df4 = df2.merge(df3, on=['year', 'quarter'])
    for key in keys:
        df4[key + "_freq"] = df4[key] / df4['total']
    df4.drop(columns=keys, inplace=True)
    df4['year'] = df4['year'].astype(int)
    return df4
In [ ]:
def draw_freq(word_freq, keys, confirmed_cases, mark):
    df7 = quarter_freq(word_freq, keys).merge(confirmed_cases, on=['year', 'quarter', 'datetime']).sort_values(['year', 'quarter'])
    df7.drop(index=df7.index[df7['datetime'] > "2022Q3"], inplace=True)
    
    fig, axes = plt.subplots(2, 1, figsize=(12, 16), sharex=True)
    
    axes[0].set_title(f"{mark} Confirmed Cases vs. Time")
    axes[0].bar(df7['datetime'], df7['sum'])
    axes[0].set_ylabel(f"{mark} Confirmed Cases")

#     for key, ax in zip(keys, axes[1:]):
    ax = axes[1]
    for key in keys:
        ax.set_title(f"Ngrams Frequency vs. Time")
        ax.plot(df7['datetime'], df7[key+'_freq'])
        ax.set_ylabel(key)
        ax.set_yscale('log')
        
    ax.legend(keys, bbox_to_anchor=(1.05, 1))
    # plt.tight_layout()
    fig.supxlabel('Time')
    return df7
In [ ]:
keys = ['health', 'infection', 'disease', 'vaccine', 'respiratory', 'protein', 'rna', 'antibody', 'drug']
combined_key = draw_freq(word_freq, keys, confirmed_cases, "Newly")
plt.show()
In [ ]:
combined_key
Out[ ]:
year quarter total datetime health_freq infection_freq disease_freq vaccine_freq respiratory_freq protein_freq rna_freq antibody_freq drug_freq sum
0 2020 2 1686772 2020Q2 0.007974 0.011325 0.013451 0.001655 0.004643 0.002528 0.001216 0.001720 0.003447 9606125.0
1 2020 3 2386642 2020Q3 0.007789 0.009260 0.011296 0.001632 0.003399 0.002785 0.001051 0.001866 0.003155 23559695.0
2 2020 4 2361162 2020Q4 0.008518 0.009332 0.010951 0.002104 0.003352 0.002533 0.000977 0.002090 0.003100 49745195.0
3 2021 1 2715100 2021Q1 0.008961 0.008706 0.009985 0.003068 0.002854 0.002585 0.000925 0.002265 0.002667 45573297.0
4 2021 2 3030952 2021Q2 0.008919 0.008570 0.009286 0.003799 0.002421 0.002639 0.000902 0.002405 0.002367 53362089.0
5 2021 3 2973443 2021Q3 0.008793 0.008476 0.008774 0.004558 0.002165 0.002449 0.000847 0.002568 0.002289 51634564.0
6 2021 4 3231277 2021Q4 0.009070 0.008293 0.008081 0.004781 0.001963 0.002244 0.000712 0.002629 0.002148 54423652.0
7 2022 1 3259854 2022Q1 0.008725 0.007804 0.007926 0.004713 0.001758 0.002362 0.000797 0.002509 0.002224 200071900.0
8 2022 2 1712846 2022Q2 0.009134 0.007761 0.007405 0.004840 0.001734 0.002449 0.000837 0.002508 0.002288 59000957.0
9 2022 3 2360 2022Q3 0.043644 0.016949 0.026695 NaN NaN 0.012288 0.009322 NaN 0.005932 69980210.0
In [ ]:
for key in keys:
    print(key, "coeff:", np.corrcoef(combined_key['sum'][:-2], combined_key[key+'_freq'][2:])[0, 1])
health coeff: 0.9636081568728317
infection coeff: 0.9086832469293848
disease coeff: 0.8987619082608881
vaccine coeff: nan
respiratory coeff: nan
protein coeff: 0.9555158388586743
rna coeff: 0.9546490843698231
antibody coeff: nan
drug coeff: 0.8669767811060866

Extra Exploration¶

  • There exist some paper published in 2022Q3 and 2022Q4 (small portion)
In [ ]:
data.createOrReplaceTempView('check')
In [ ]:
ss.sql("""select doi, publish_time from check 
        where year(publish_time) = 2022 and floor((month(publish_time)+2)/3) >= 3 limit 5""").take(5)
Out[ ]:
[Row(doi='10.1016/j.jad.2022.05.060', publish_time='2022-08-15'),
 Row(doi='10.1098/rsta.2020.0411', publish_time='2022-07-11'),
 Row(doi='10.1212/nxi.0000000000001183', publish_time='2022-07-01'),
 Row(doi='10.24272/j.issn.2095-8137.2021.480', publish_time='2022-07-18'),
 Row(doi='10.1080/16549716.2022.2058170', publish_time='2022-12-31')]